// bthread - A M:N threading library to make applications more concurrent.
// Copyright (c) 2014 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 22 17:30:12 CST 2014

#include "butil/atomicops.h"                // butil::atomic
#include "butil/scoped_lock.h"              // BAIDU_SCOPED_LOCK
#include "butil/macros.h"
#include "butil/containers/linked_list.h"   // LinkNode
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
#include "butil/memory/singleton_on_pthread_once.h"
#endif
#include "butil/logging.h"
#include "butil/object_pool.h"
#include "bthread/errno.h"                 // EWOULDBLOCK, ESTOP
#include "bthread/sys_futex.h"             // futex_*
#include "bthread/processor.h"             // cpu_relax
#include "bthread/task_control.h"          // TaskControl
#include "bthread/task_group.h"            // TaskGroup
#include "bthread/timer_thread.h"
#include "bthread/butex.h"
#include "bthread/mutex.h"

// This file implements butex.h
// Provides futex-like semantics which is sequenced wait and wake operations
// and guaranteed visibilities.
//
// If wait is sequenced before wake:
//    [thread1]             [thread2]
//    wait()                value = new_value
//                          wake()
// wait() sees unmatched value(fail to wait), or wake() sees the waiter.
//
// If wait is sequenced after wake:
//    [thread1]             [thread2]
//                          value = new_value
//                          wake()
//    wait()
// wake() must provide some sort of memory fence to prevent assignment
// of value to be reordered after it. Thus the value is visible to wait()
// as well.

namespace bthread {

#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
struct ButexWaiterCount : public bvar::Adder<int64_t> {
    ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {}
};
inline bvar::Adder<int64_t>& butex_waiter_count() {
    return *butil::get_leaky_singleton<ButexWaiterCount>();
}
#endif

// Implemented in task_group.cpp
int stop_and_consume_butex_waiter(bthread_t tid, ButexWaiter** pw);
int set_butex_waiter(bthread_t tid, ButexWaiter* w);

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2; 

enum WaiterState {
    WAITER_STATE_NONE,
    WAITER_STATE_TIMED,
    WAITER_STATE_CANCELLED,
    WAITER_STATE_TIMEDOUT
};

struct Butex;

struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
    // tids of pthreads are 0
    bthread_t tid;

    // Erasing node from middle of LinkedList is thread-unsafe, we need
    // to hold its container's lock.
    butil::atomic<Butex*> container;
};

// non_pthread_task allocates this structure on stack and queue it in
// Butex::waiters.
struct ButexBthreadWaiter : public ButexWaiter {
    TaskMeta* task_meta;
    TimerThread::TaskId sleep_id;
    WaiterState waiter_state;
    int expected_value;
    Butex* initial_butex;
    TaskControl* control;
};

// pthread_task or main_task allocates this structure on stack and queue it
// in Butex::waiters.
struct ButexPthreadWaiter : public ButexWaiter {
    butil::atomic<int> sig;
};

typedef butil::LinkedList<ButexWaiter> ButexWaiterList;

enum BUTEX_PTHREAD_SIGNAL {
    NOT_SIGNALLED = 0,
    SIGNALLED = 1,
    SAFE_TO_DESTROY
};

struct BAIDU_CACHELINE_ALIGNMENT Butex {
    Butex() {}
    ~Butex() {}

    butil::atomic<int> value;
    ButexWaiterList waiters;
    internal::FastPthreadMutex waiter_lock;
};

BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline);

void wakeup_pthread(ButexPthreadWaiter* pw) {
    // release fence makes wait_pthread see other changes when it sees new sig
    pw->sig.store(SAFE_TO_DESTROY, butil::memory_order_release);
    // At this point, *pw is possibly destroyed if wait_pthread has woken up and
    // seen the new sig. As the futex_wake_private just check the accessibility
    // of the memory and returnes EFAULT in this case, it's just fine.
    // If crash happens in the future, we can make pw as tls and never
    // destroyed to resolve this issue.
    futex_wake_private(&pw->sig, 1);
}

bool erase_from_butex(ButexWaiter*, bool);

int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
    int expected_value = NOT_SIGNALLED;
    while (true) {
        const int rc = futex_wait_private(&pw.sig, expected_value, ptimeout);
        // Accquire fence makes this thread sees other changes when it sees
        // the new |sig|
        if (expected_value != pw.sig.load(butil::memory_order_acquire)) {
            // After this routine returns, |pw| will be destroyed while the wake
            // thread possibly still uses it. See the comments in wakeup_pthread
            return rc;
        }
        if (rc != 0 && errno == ETIMEDOUT) {
            // Remove pw from waiters to make sure no one would wakeup pw after
            // this function returnes.
            if (!erase_from_butex(&pw, false)) {
                // Another thread holds pw, attemping to signal it, spin until
                // it's safe to destroy pw
                // Make sure this thread sees the lastest changes when sig is
                // set to SAFE_TO_DESTROY
                BT_LOOP_WHEN(pw.sig.load(butil::memory_order_acquire) 
                                    != SAFE_TO_DESTROY,
                             30/*nops before sched_yield*/);

            }
            return rc;
        }
    }
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
inline int unsleep_if_necessary(ButexBthreadWaiter* w,
                                TimerThread* timer_thread) {
    if (!w->sleep_id) {
        return 0;
    }
    if (timer_thread->unschedule(w->sleep_id) > 0) {
        // the callback is running.
        return -1;
    }
    w->sleep_id = 0;
    return 0;
}

// Using ObjectPool(which never frees memory) to solve the race between
// butex_wake() and butex_destroy(). The race is as follows:
//
//   class Event {
//   public:
//     void wait() {
//       _mutex.lock();
//       if (!_done) {
//         _cond.wait(&_mutex);
//       }
//       _mutex.unlock();
//     }
//     void signal() {
//       _mutex.lock();
//       if (!_done) {
//         _done = true;
//         _cond.signal();
//       }
//       _mutex.unlock();  /*1*/
//     }
//   private:
//     bool _done = false;
//     Mutex _mutex;
//     Condition _cond;
//   };
//
//   [Thread1]                         [Thread2]
//   foo() {
//     Event event;
//     pass_to_thread2(&event);  --->  event.signal();
//     event.wait();
//   } <-- event destroyed
//   
// Summary: Thread1 passes a stateful condition to thread2 and wait until
// the the condition is signalled, which basically means the associated
// job is done and Thread1 can release related resources including the mutex
// and condition. The scenario is fine and code is correct.
// The race needs a closer look. If we look into the unlock at /*1*/, it
// may have different implementations, but the last step is probably an
// atomic store and butex_wake(), like this:
//
//   locked->store(0);
//   butex_wake(locked);
//
// `locked' represents the locking status of the mutex. The issue is that
// just after the store(), the mutex is already unlocked, the code in
// Event.wait() may successfully grab the lock and go through everything
// left and leave foo() function, destroying the mutex and butex, thus the
// butex_wake(locked) may crash.
// To solve this issue, one method is to add reference before store and
// release the reference after butex_wake. However this kind of
// reference-counting needs to be added in nearly every user scenario of
// butex_wake(), which is very error-prone. Another method is never freeing
// butex, the side effect is that butex_wake() may wake up an unrelated
// butex(the one reuses the memory) and cause spurious wakeups. According
// to our observations, the race is infrequent, even rare. The extra spurious
// wakeup should be acceptable.

void* butex_create() {
    Butex* b = butil::get_object<Butex>();
    if (b) {
        return &b->value;
    }
    return NULL;
}

void butex_destroy(void* butex) {
    if (!butex) {
        return;
    }
    Butex* b = static_cast<Butex*>(
        container_of(static_cast<butil::atomic<int>*>(butex), Butex, value));
    butil::return_object(b);
}

inline TaskGroup* get_task_group(TaskControl* c) {
    TaskGroup* g = tls_task_group;
    return g ? g : c->choose_one_group();
}

int butex_wake(void* arg) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    ButexWaiter* front = NULL;
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        if (b->waiters.empty()) {
            return 0;
        }
        front = b->waiters.head()->value();
        front->RemoveFromList();
        front->container.store(NULL, butil::memory_order_relaxed);
    }
    if (front->tid == 0) {
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
        return 1;
    }
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    unsleep_if_necessary(bbw, get_global_timer_thread());
    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
    }
    return 1;
}

int butex_wake_all(void* arg) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

    ButexWaiterList bthread_waiters;
    ButexWaiterList pthread_waiters;
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
            bw->container.store(NULL, butil::memory_order_relaxed);
            if (bw->tid) {
                bthread_waiters.Append(bw);
            } else {
                pthread_waiters.Append(bw);
            }
        }
    }

    int nwakeup = 0;
    while (!pthread_waiters.empty()) {
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
            pthread_waiters.head()->value());
        bw->RemoveFromList();
        wakeup_pthread(bw);
        ++nwakeup;
    }
    if (bthread_waiters.empty()) {
        return nwakeup;
    }
    // We will exchange with first waiter in the end.
    ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>(
        bthread_waiters.head()->value());
    next->RemoveFromList();
    unsleep_if_necessary(next, get_global_timer_thread());
    ++nwakeup;
    TaskGroup* g = get_task_group(next->control);
    const int saved_nwakeup = nwakeup;
    while (!bthread_waiters.empty()) {
        // pop reversely
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
            bthread_waiters.tail()->value());
        w->RemoveFromList();
        unsleep_if_necessary(w, get_global_timer_thread());
        g->ready_to_run_general(w->tid, true);
        ++nwakeup;
    }
    if (saved_nwakeup != nwakeup) {
        g->flush_nosignal_tasks_general();
    }
    if (g == tls_task_group) {
        TaskGroup::exchange(&g, next->tid);
    } else {
        g->ready_to_run_remote(next->tid);
    }
    return nwakeup;
}

int butex_wake_except(void* arg, bthread_t excluded_bthread) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

    ButexWaiterList bthread_waiters;
    ButexWaiterList pthread_waiters;
    {
        ButexWaiter* excluded_waiter = NULL;
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();

            if (bw->tid) {
                if (bw->tid != excluded_bthread) {
                    bthread_waiters.Append(bw);
                    bw->container.store(NULL, butil::memory_order_relaxed);
                } else {
                    excluded_waiter = bw;
                }
            } else {
                bw->container.store(NULL, butil::memory_order_relaxed);
                pthread_waiters.Append(bw);
            }
        }

        if (excluded_waiter) {
            b->waiters.Append(excluded_waiter);
        }
    }

    int nwakeup = 0;
    while (!pthread_waiters.empty()) {
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
            pthread_waiters.head()->value());
        bw->RemoveFromList();
        wakeup_pthread(bw);
        ++nwakeup;
    }

    if (bthread_waiters.empty()) {
        return nwakeup;
    }
    ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>(
                bthread_waiters.head()->value());

    TaskGroup* g = get_task_group(front->control);
    const int saved_nwakeup = nwakeup;
    do {
        // pop reversely
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
            bthread_waiters.tail()->value());
        w->RemoveFromList();
        unsleep_if_necessary(w, get_global_timer_thread());
        g->ready_to_run_general(w->tid, true);
        ++nwakeup;
    } while (!bthread_waiters.empty());
    if (saved_nwakeup != nwakeup) {
        g->flush_nosignal_tasks_general();
    }
    return nwakeup;
}

int butex_requeue(void* arg, void* arg2) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);

    ButexWaiter* front = NULL;
    {
        std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
        std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
        butil::double_lock(lck1, lck2);
        if (b->waiters.empty()) {
            return 0;
        }

        front = b->waiters.head()->value();
        front->RemoveFromList();
        front->container.store(NULL, butil::memory_order_relaxed);

        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
            m->waiters.Append(bw);
            bw->container.store(m, butil::memory_order_relaxed);
        }
    }

    if (front->tid == 0) {  // which is a pthread
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
        return 1;
    }
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    unsleep_if_necessary(bbw, get_global_timer_thread());
    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, front->tid);
    } else {
        bbw->control->choose_one_group()->ready_to_run_remote(front->tid);
    }
    return 1;
}

// Callable from multiple threads, at most one thread may wake up the waiter.
static void erase_from_butex_and_wakeup(void* arg) {
    erase_from_butex(static_cast<ButexWaiter*>(arg), true);
}

inline bool erase_from_butex(ButexWaiter* bw, bool wakeup) {
    // `bw' is guaranteed to be valid inside this function because waiter
    // will wait until this function being cancelled or finished.
    // NOTE: This function must be no-op when bw->container is NULL.
    bool erased = false;
    Butex* b;
    int saved_errno = errno;
    while ((b = bw->container.load(butil::memory_order_acquire))) {
        // b can be NULL when the waiter is scheduled but queued.
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        if (b == bw->container.load(butil::memory_order_relaxed)) {
            bw->RemoveFromList();
            bw->container.store(NULL, butil::memory_order_relaxed);
            if (bw->tid) {
                static_cast<ButexBthreadWaiter*>(bw)->waiter_state = WAITER_STATE_TIMEDOUT;
            }
            erased = true;
            break;
        }
    }
    if (erased && wakeup) {
        if (bw->tid) {
            ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
            get_task_group(bbw->control)->ready_to_run_general(bw->tid);
        } else {
            ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
            wakeup_pthread(pw);
        }
    }
    errno = saved_errno;
    return erased;
}

static void wait_for_butex(void* arg) {
    ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
    Butex* const b = bw->initial_butex;
    // 1: waiter with timeout should have waiter_state == WAITER_STATE_TIMED
    //    before they're are queued, otherwise the waiter is already timedout
    //    and removed by TimerThread, in which case we should stop queueing.
    //
    // Visibility of waiter_state:
    //    [bthread]                         [TimerThread]
    //    waiter_state = TIMED
    //    tt_lock { add task }
    //                                      tt_lock { get task }
    //                                      waiter_lock { waiter_state=TIMEDOUT }
    //    waiter_lock { use waiter_state }
    // tt_lock represents TimerThread::_mutex. Obviously visibility of
    // waiter_state are sequenced by two locks, both threads are guaranteed to
    // see the correct value.
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        if (b->value.load(butil::memory_order_relaxed) == bw->expected_value &&
            bw->waiter_state != WAITER_STATE_TIMEDOUT/*1*/ &&
            (!bw->task_meta->stop || !bw->task_meta->interruptible)) {
            b->waiters.Append(bw);
            bw->container.store(b, butil::memory_order_relaxed);
            return;
        }
    }
    
    // b->container is NULL which makes erase_from_butex_and_wakeup() and
    // stop_butex_wait() no-op, there's no race between following code and
    // the two functions. The on-stack ButexBthreadWaiter is safe to use and
    // bw->waiter_state will not change again.
    unsleep_if_necessary(bw, get_global_timer_thread());
    if (bw->waiter_state != WAITER_STATE_TIMEDOUT) {
        bw->waiter_state = WAITER_STATE_CANCELLED;
    }
    tls_task_group->ready_to_run(bw->tid);
    // FIXME: jump back to original thread is buggy.
    
    // // Value unmatched or waiter is already woken up by TimerThread, jump
    // // back to original bthread.
    // TaskGroup* g = tls_task_group;
    // ReadyToRunArgs args = { g->current_tid(), false };
    // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
    // // 2: Don't run remained because we're already in a remained function
    // //    otherwise stack may overflow.
    // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
                                   const timespec* abstime) {
    // sys futex needs relative timeout.
    // Compute diff between abstime and now.
    timespec* ptimeout = NULL;
    timespec timeout;
    if (abstime != NULL) {
        const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) -
            butil::gettimeofday_us();
        if (timeout_us < MIN_SLEEP_US) {
            errno = ETIMEDOUT;
            return -1;
        }
        timeout = butil::microseconds_to_timespec(timeout_us);
        ptimeout = &timeout;
    }

    TaskMeta* task = NULL;
    bool set_waiter = false;
    ButexPthreadWaiter pw;
    pw.tid = 0;
    pw.sig.store(NOT_SIGNALLED, butil::memory_order_relaxed);
    int rc = 0;
    
    if (g) {
        task = g->current_task();
        if (task->interruptible) {
            if (task->stop) {
                errno = ESTOP;
                return -1;
            }
            set_waiter = true;
            task->current_waiter.store(&pw, butil::memory_order_release);
        }
    }
    b->waiter_lock.lock();
    if (b->value.load(butil::memory_order_relaxed) == expected_value) {
        b->waiters.Append(&pw);
        pw.container.store(b, butil::memory_order_relaxed);
        b->waiter_lock.unlock();

#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
        bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
        num_waiters << 1;
#endif
        rc = wait_pthread(pw, ptimeout);
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
        num_waiters << -1;
#endif
    } else {
        b->waiter_lock.unlock();
        errno = EWOULDBLOCK;
        rc = -1;
    }
    if (task) {
        if (set_waiter) {
            // If current_waiter is NULL, stop_butex_wait() is running and
            // using pw, spin until current_waiter != NULL.
            BT_LOOP_WHEN(task->current_waiter.exchange(
                             NULL, butil::memory_order_acquire) == NULL,
                         30/*nops before sched_yield*/);
        }
        if (task->stop) {
            errno = ESTOP;
            return -1;
        }
    }
    return rc;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
        errno = EWOULDBLOCK;
        // Sometimes we may take actions immediately after unmatched butex,
        // this fence makes sure that we see changes before changing butex.
        butil::atomic_thread_fence(butil::memory_order_acquire);
        return -1;
    }
    TaskGroup* g = tls_task_group;
    if (NULL == g || g->is_current_pthread_task()) {
        return butex_wait_from_pthread(g, b, expected_value, abstime);
    }
    ButexBthreadWaiter bbw;
    // tid is 0 iff the thread is non-bthread
    bbw.tid = g->current_tid();
    bbw.container.store(NULL, butil::memory_order_relaxed);
    bbw.task_meta = g->current_task();
    bbw.sleep_id = 0;
    bbw.waiter_state = WAITER_STATE_NONE;
    bbw.expected_value = expected_value;
    bbw.initial_butex = b;
    bbw.control = g->control();

    if (abstime != NULL) {
        // Schedule timer before queueing. If the timer is triggered before
        // queueing, cancel queueing. This is a kind of optimistic locking.
        bbw.waiter_state = WAITER_STATE_TIMED;
        // Already timed out.
        if (butil::timespec_to_microseconds(*abstime) <
            (butil::gettimeofday_us() + MIN_SLEEP_US)) {
            errno = ETIMEDOUT;
            return -1;
        }
        bbw.sleep_id = get_global_timer_thread()->schedule(
            erase_from_butex_and_wakeup, &bbw, *abstime);
        if (!bbw.sleep_id) {  // TimerThread stopped.
            errno = ESTOP;
            return -1;
        }
    }
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
    bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
    num_waiters << 1;
#endif

    // release fence matches with acquire fence in stop_and_consume_butex_waiter
    // in task_group.cpp to guarantee visibility of `interruptible'.
    bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
    g->set_remained(wait_for_butex, &bbw);
    TaskGroup::sched(&g);

    // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
    // running and using bbw. The chance is small, just spin until it's done.
    BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
                 30/*nops before sched_yield*/);
    
    // If current_waiter is NULL, stop_butex_wait() is running and using bbw.
    // Spin until current_waiter != NULL.
    BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
                     NULL, butil::memory_order_acquire) == NULL,
                 30/*nops before sched_yield*/);
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
    num_waiters << -1;
#endif

    // ESTOP has highest priority.
    if (bbw.task_meta->stop) {
        errno = ESTOP;
        return -1;
    }
    // If timed out as well as value unmatched, return ETIMEDOUT.
    if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
        errno = ETIMEDOUT;
        return -1;
    } else if (WAITER_STATE_CANCELLED == bbw.waiter_state) {
        errno = EWOULDBLOCK;
        return -1;
    }
    return 0;
}

int butex_wait_uninterruptible(void* arg, int expected_value, const timespec* abstime) {
    TaskGroup* g = tls_task_group;
    TaskMeta* caller = NULL;
    bool saved_interruptible = true;
    if (NULL != g) {
        caller = g->current_task();
        saved_interruptible = caller->interruptible;
        caller->interruptible = false;
    }
    const int rc = butex_wait(arg, expected_value, abstime);
    if (caller) {
        caller->interruptible = saved_interruptible;
    }
    return rc;
}

int stop_butex_wait(bthread_t tid) {
    // Consume current_waiter in the TaskMeta, wake it up then set it back.
    ButexWaiter* w = NULL;
    if (stop_and_consume_butex_waiter(tid, &w) < 0) {
        return -1;
    }
    if (w != NULL) {
        erase_from_butex(w, true);
        // If butex_wait() already wakes up before we set current_waiter back,
        // the function will spin until current_waiter becomes non-NULL.
        if (__builtin_expect(set_butex_waiter(tid, w) < 0, 0)) {
            LOG(FATAL) << "butex_wait should spin until setting back waiter";
            return -1;
        }
    }
    return 0;
}

}  // namespace bthread

namespace butil {
template <> struct ObjectPoolBlockMaxItem<bthread::Butex> {
    static const size_t value = 128;
};
}
